Skip to content

Add function metadata ability to push down struct argument in optimizer #25175

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 4, 2025

Conversation

kevintang2022
Copy link
Contributor

@kevintang2022 kevintang2022 commented May 22, 2025

Summary:
For some user defined functions, the pushdown subfield optimizer should transparently pass down utilized subfields of a struct type. The goal is to make the query plan look the same as if the udf was not being called on the struct. In order to accomplish this, the user defined function needs to take the struct argument passed into it, and unwrap it when converting an expression to a subfield.

Since there is no guarantee that the struct argument is always the first argument in the udf, the udf needs to specify which argument index to push down in its metadata.

T224244100

Presto version 0.293-20250525.210422-369

Differential Revision: D74738214

Test plan:
With this change, both of the queries below produce the same query plan after the table scan node rewrite

explain with shaped as (SELECT fb_reshape_row(person,CAST(NULL AS ROW(age INTEGER, city VARCHAR))) AS pcol FROM tangk_struct_table),
raw as (select person as pcol from tangk_struct_table)
select pcol.age from raw;
explain with shaped as (SELECT fb_reshape_row(person,CAST(NULL AS ROW(age INTEGER, city VARCHAR))) AS pcol FROM tangk_struct_table),
raw as (select person as pcol from tangk_struct_table)
select pcol.age from shaped;

20250525_235045_00003_tu7a9 correct query plan with pushed down subfield

Fragment 0 [SINGLE]
    CPU: 0.00ns, Scheduled: 0.00ns, Input: 0 rows (0B); per task: avg.: 0.00 std.dev.: 0.00, Output: 0 rows (0B), 1 tasks
    Output layout: [field]
    Output partitioning: SINGLE []
    Output encoding: COLUMNAR
    Stage Execution Strategy: UNGROUPED_EXECUTION
    - Output[PlanNodeId 6][Query Plan] => [field:varchar(807)]
            Query Plan := field
        - Values[PlanNodeId 0] => [field:varchar(807)]
                (VARCHAR'- Output[PlanNodeId 10][age] => [expr_3:integer]
                        age := expr_3 (3:8)
                    - RemoteStreamingExchange[PlanNodeId 218][GATHER - COLUMNAR] => [expr_3:integer]
                        - ScanProject[PlanNodeId 0,6][table = TableHandle {connectorId=''prism'', connectorHandle=''PrismTableHandle{schemaName=di, tableName=tangk_struct_table, analyzePartitionValues=Optional.empty, sideTableFeatureIds=[]}'', layout=''Optional[di.tangk_struct_table{}]''}, projectLocality = LOCAL] => [expr_3:integer]
                                expr_3 := DEREFERENCE(fb_reshape_row(person, null), INTEGER''0'') (1:114)
                                LAYOUT: di.tangk_struct_table{}
                                person := person:struct<age:int,city:string>:0:REGULAR:[person.age] (1:113)
                                id:bigint:-13:PARTITION_KEY
                                    :: [["1"], ["2"], ["3"], ["4"], ["5"]]
                ')

20250525_235408_00004_tu7a9 query plan with non relevant function

Fragment 0 [SINGLE]
    CPU: 0.00ns, Scheduled: 0.00ns, Input: 0 rows (0B); per task: avg.: 0.00 std.dev.: 0.00, Output: 0 rows (0B), 1 tasks
    Output layout: [field]
    Output partitioning: SINGLE []
    Output encoding: COLUMNAR
    Stage Execution Strategy: UNGROUPED_EXECUTION
    - Output[PlanNodeId 6][Query Plan] => [field:varchar(798)]
            Query Plan := field
        - Values[PlanNodeId 0] => [field:varchar(798)]
                (VARCHAR'- Output[PlanNodeId 10][age] => [expr_3:integer]
                        age := expr_3 (3:8)
                    - RemoteStreamingExchange[PlanNodeId 218][GATHER - COLUMNAR] => [expr_3:integer]
                        - ScanProject[PlanNodeId 0,6][table = TableHandle {connectorId=''prism'', connectorHandle=''PrismTableHandle{schemaName=di, tableName=tangk_struct_table, analyzePartitionValues=Optional.empty, sideTableFeatureIds=[]}'', layout=''Optional[di.tangk_struct_table{}]''}, projectLocality = LOCAL] => [expr_3:integer]
                                expr_3 := DEREFERENCE(fb_reshape_row_old(person, null), INTEGER''0'') (1:118)
                                LAYOUT: di.tangk_struct_table{}
                                person := person:struct<age:int,city:string>:0:REGULAR (1:117)
                                id:bigint:-13:PARTITION_KEY
                                    :: [["1"], ["2"], ["3"], ["4"], ["5"]]
                ')

20250525_235845_00005_tu7a9 expected plan

Fragment 0 [SINGLE]
    CPU: 0.00ns, Scheduled: 0.00ns, Input: 0 rows (0B); per task: avg.: 0.00 std.dev.: 0.00, Output: 0 rows (0B), 1 tasks
    Output layout: [field]
    Output partitioning: SINGLE []
    Output encoding: COLUMNAR
    Stage Execution Strategy: UNGROUPED_EXECUTION
    - Output[PlanNodeId 6][Query Plan] => [field:varchar(783)]
            Query Plan := field
        - Values[PlanNodeId 0] => [field:varchar(783)]
                (VARCHAR'- Output[PlanNodeId 10][age] => [expr_4:integer]
                        age := expr_4 (3:8)
                    - RemoteStreamingExchange[PlanNodeId 211][GATHER - COLUMNAR] => [expr_4:integer]
                        - ScanProject[PlanNodeId 0,6][table = TableHandle {connectorId=''prism'', connectorHandle=''PrismTableHandle{schemaName=di, tableName=tangk_struct_table, analyzePartitionValues=Optional.empty, sideTableFeatureIds=[]}'', layout=''Optional[di.tangk_struct_table{}]''}, projectLocality = LOCAL] => [expr_4:integer]
                                expr_4 := DEREFERENCE(person, INTEGER''0'') (2:17)
                                LAYOUT: di.tangk_struct_table{}
                                person := person:struct<age:int,city:string>:0:REGULAR:[person.age] (2:36)
                                id:bigint:-13:PARTITION_KEY
                                    :: [["1"], ["2"], ["3"], ["4"], ["5"]]
                ')


Verifier suite build: 20250521_205359_71488_cm4iz

pt suite build --predicate "lower(query) like '%fb_reshape_row%'" --suite atn_fb_reshape_row_subfields_udf --region atn --days 100

UDF only
https://www.internalfb.com/intern/presto/verifier/results/?test_id=223902
General
https://our.intern.facebook.com/intern/presto/verifier/results/?test_id=223903

Release Notes

Please follow release notes guidelines and fill in the release notes below.

== RELEASE NOTES ==

General Changes
* Add pushdownSubfieldArgIndex parameter to ComplexTypeFunctionDescriptor for subfield optimization during query planning

@kevintang2022 kevintang2022 requested a review from a team as a code owner May 22, 2025 17:10
@facebook-github-bot
Copy link
Collaborator

This pull request was exported from Phabricator. Differential Revision: D74738214

@kevintang2022 kevintang2022 requested a review from rschlussel May 22, 2025 17:22
@kevintang2022 kevintang2022 changed the title fb_reshape_row always push down subfields Add function metadata ability to push down struct argument in optimizer May 22, 2025
Copy link
Contributor

@rschlussel rschlussel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Can you add a test? Maybe to TestHiveLogicalPlanner that the subfields are being pushed down and then a query correctness test (see TestLambdaSubfieldPruning for examples of tests for a related feature). You may have to register a function in the test that uses the new field you added (e.g. a passthrough function that takes a row and returns the row unchanged) to exercise your code.

@@ -33,4 +33,6 @@
boolean deterministic() default true;

boolean calledOnNullInput() default false;

int pushdownSubfieldArgIndex() default -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface method, and the ones on ScalarFunction and SqlInvokedScalarFunction, are parameters to an annotation. Does a user implementing a function using the Presto SPI manually specify this value in the annotation? If not, then these should be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes they are specified in the annotation. If it is not specified, then it is -1 by default

Annotation looks like this

@CodegenScalarFunction(value = "function_name", calledOnNullInput = true, pushdownSubfieldArgIndex = 0)

Copy link
Contributor

@tdcmeehan tdcmeehan May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some documentation on how this needs to be used to our documentation?

Also, why not add an annotation to the argument itself? Something like, @RowMayBeDereferenced. (BTW, can you give an example of how a function could know that this is safe to do?) You could annotate multiple of them, and we we could validate that the argument is, indeed, a struct to begin with.

Also, your example reference internal queries, can you add pastes of the explain plans?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add some tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using annotation on the argument, I have chosen to pass the argindex in the codegen decorator because this allows it to be inside the FunctionMetadata.

I added some tests to TestHiveLogicalPlanner. One more change I will make is to perform some validation that the argIndex specified does correspond to a rowtype. And throw a warning when the code path is not reached due to invalid index

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdcmeehan @rschlussel
Looking into the ComplexTypeDescriptor approach. I think it requires some changes to FunctionExtractor. reshape function is registered through a plugin. Currently, the Plugin interface only supports annotated function definitions.

Tried adding this line in FunctionExtractor, but this apporach does not seem ideal. It also assumes that the instance of SqlScalarFunction only has one constructor and it's an empty constructor. The way that other SqlScalarFunction get registered is inside BuiltInTypeAndFunctionNamespaceManager. And they are all BuiltInFunction because SqlScalarFunction is extension of BuiltInFunction.

        if (SqlFunction.class.isAssignableFrom(clazz)) {
            try {
                return new FunctionListBuilder()
                        .function((BuiltInFunction) clazz.getConstructor().newInstance())
                        .getFunctions();
            }
            catch (Exception e) {
                throw new RuntimeException(format("Error adding BuiltInFunction %s", e.getMessage()));
            }
        }

I also don't see any examples of SqlScalarFunction added through plugin. If you look at any Plugin implementation, the getFunctions override only adds functions that are annotated, most of them are with @ScalarFunction. Take for example I18nFunctionsPlugin.

I'm wondering if you two have any other thoughts on this approach and other things we need to consider.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that ComplexTypeFunctionDescriptor would be the best place for it. However, SqlScalarFunction is in presto-main-base. You can't use it from a plugin. You need a way to generate it from an annotation.

What if we added the ability to specify the complexTypeFunctionDescriptor from an annotation. like what was removed in this commit d6e008c (and added earlier in the same PR). I can't remember why it was removed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the problem now. Plugin-registered functions must use annotations and this function is registered through the Plugin SPI, which means we can't incorporate the feedback above directly (for example, from within a Plugin you won't even be able to subclass SqlScalarFunction).

So taking a step back, the main thing I'm not in favor of in this PR is adding pushdownSubfieldArgIndex to our scalar function annotations (going back to scope of API addition vs. what we get out of it). We also already use ComplexTypeFunctionDescriptor to instruct PushdownSubfields on how to interact with complex types, but these all apply to maps and arrays, and they're functions that are built-in and can use the verbose SqlScalarFunction definition method above because they're defined in the main module.

I can think of a few of ways of going about it.

  1. Trino has a FunctionProvider SPI that lets you lazily build scalar functions, that could work to allow us to verbosely construct this internal function from within the SPI. (Similarly, we could allow SqlScalarFunction to annotate a no-arg static factory method which returns a concrete instance of SqlFunction).
  2. We could add an annotation that allows one to define companion information in the function, such as ComplexTypeFunctionDescriptor. Or we could add a separate annotation that allows the fields in ComplexTypeFunctionDescriptor to be specified independently, though this would be awkward at the method level, and I think it would be more appropriate at the argument level.

I'm also wondering, looking at ComplexTypeFunctionDescriptor, if the functionality we're adding here can already be accomplished by the code in this class, specifically the outputToInputTransformationFunction field.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be in favor of restoring an annotation, but I think the one above was mostly a 1:1 copy of ComplexTypeFunctionDescriptor and can be improved (I don't think it should have a StaticMethodPointer that references internal engine classes, and argumentIndicesContainingMapOrArray should just be reworked to be per-argument).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, so i think the next steps here are
1.Add the new passthrough field to ComplextTypeFunctionDescriptor instead of scalar annotation. can then use it in SubfieldPushdown
2. create an annotation for creating a complexTypeFunctionDesscriptor (probably doesn't need to be fully featured at this point? can add to it as needed), add parsing ability for the annotation for generating the function metadata.
2. use the new annotation in fb_reshape_row

@steveburnett
Copy link
Contributor

Suggest adding a release note, or a NO RELEASE NOTE block, as appropriate.

@facebook-github-bot
Copy link
Collaborator

@kevintang2022 has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

1 similar comment
@facebook-github-bot
Copy link
Collaborator

@kevintang2022 has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@kevintang2022 kevintang2022 force-pushed the export-D74738214 branch 2 times, most recently from 2c2555f to 2c73136 Compare May 27, 2025 02:07
Copy link
Contributor

@rschlussel rschlussel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the new tests!

}
checkCondition(signature.getArgumentTypes().size() > pushdownSubfieldArgIndex, FUNCTION_IMPLEMENTATION_ERROR, "Method [%s] has out of range pushdown subfield arg index", method);
String typeVariableName = signature.getArgumentTypes().get(pushdownSubfieldArgIndex).toString();
checkCondition(typeVariableName.equals(com.facebook.presto.common.type.StandardTypes.ROW) || typeConstraintMapping.get(typeVariableName).equals(com.facebook.presto.common.type.StandardTypes.ROW), FUNCTION_IMPLEMENTATION_ERROR, "Method [%s] does not have a struct or row type as pushdown subfield arg", method);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or has no typeConstraintMapping entry (maybe no constraint on the type) or typeConstraintMapping value is null and TypeVariableConstraint.nonDecimalNumericRequired is false (i.e. there's no constraint preventing row type?) ?

Copy link
Contributor Author

@kevintang2022 kevintang2022 May 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is too restrictive. The argument index is only checked for these conditions if the pushdownSubfieldArgIndex.isPresent().

Also, the typeVariableName should always be present because signature.getArgumentTypes() always contains the same number of parameters as the function signature. There is always a type constraint imposed on a positional argument.

I think you might have it confused with typeVariableConstraints. typeVariableConstraints might be empty if we don't use an alias for a type (T for RowType). typeConstraintMapping is built using typeVariableConstraints.

So my check ensures that the parameter is either directly named a "row" type, or it has a type alias that maps to a "row" type

In summary: typeVariableConstraints is retreived from this annotation: @TypeParameter(value = "T", boundedBy = ROW)

argumentTypes is retrieved from the function definition public static Block customStructWithPassthrough(@SqlType("T") Block struct)

Screenshot 2025-05-27 at 2 19 52 PM

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make it clearer, heres an example of what the variables look like when the function takes in more than one parameter

Screenshot 2025-05-27 at 2 56 22 PM Screenshot 2025-05-27 at 2 56 11 PM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for something of a generic type "T" to accept a row type, but not have variadicBound "row". E.g. maybe it can be used for all types , so it doesn't have a TypeVariableConstraint, or it just requires that the types be orderable, but can be a row or other type (so variadic bound is null)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the logic desired?

            // The type variable must be directly a ROW type
            // or (it is a type alias that is not bounded by a type)
            // or (it is a type alias that maps to a row type)
            boolean meetsTypeConstraint =  (!typeConstraintMapping.containsKey(typeVariableName)  && typeVariableName.equals(com.facebook.presto.common.type.StandardTypes.ROW)) ||
                    (typeConstraintMapping.containsKey(typeVariableName) && typeConstraintMapping.get(typeVariableName).getVariadicBound() == null && !typeConstraintMapping.get(typeVariableName).isNonDecimalNumericRequired()) ||
                    (typeConstraintMapping.containsKey(typeVariableName) && typeConstraintMapping.get(typeVariableName).getVariadicBound().equals(com.facebook.presto.common.type.StandardTypes.ROW));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that looks good

checkCondition(pushdownSubfieldArgIndex.get() >= 0, FUNCTION_IMPLEMENTATION_ERROR, "Method [%s] has negative pushdown subfield arg index", method);
checkCondition(signature.getArgumentTypes().size() > pushdownSubfieldArgIndex.get(), FUNCTION_IMPLEMENTATION_ERROR, "Method [%s] has out of range pushdown subfield arg index", method);
String typeVariableName = signature.getArgumentTypes().get(pushdownSubfieldArgIndex.get()).toString();
checkCondition(typeVariableName.equals(com.facebook.presto.common.type.StandardTypes.ROW) || typeConstraintMapping.get(typeVariableName).equals(com.facebook.presto.common.type.StandardTypes.ROW), FUNCTION_IMPLEMENTATION_ERROR, "Method [%s] does not have a struct or row type as pushdown subfield arg", method);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment about maybe this is too restrictive and need to add some other conditions.

@facebook-github-bot
Copy link
Collaborator

@kevintang2022 has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

1 similar comment
@facebook-github-bot
Copy link
Collaborator

@kevintang2022 has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@facebook-github-bot
Copy link
Collaborator

@kevintang2022 has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@kevintang2022 kevintang2022 requested a review from rschlussel June 3, 2025 18:08
@kevintang2022 kevintang2022 merged commit f4f1934 into prestodb:master Jun 4, 2025
100 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants